#include "config.h"

#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>

#define DBG_SUBSYS S_LIBSTORAGE

#include "limits.h"
#include "adt.h"
#include "ynet_rpc.h"
#include "sysy_lib.h"
#include "cluster.h"
#include "chunk.h"
#include "bmap.h"
#include "metadata.h"
#include "net_table.h"
#include "configure.h"
#include "net_global.h"
#include "../controller/md_proto.h"
#include "../controller/table_proto.h"
#include "disk.h"
#include "../../cluster/dispatch/dispatch.h"
#include "../chunk/chunk_proto.h"
#include "../chunk/chunk_cleanup.h"
#include "../ynet/include/main_loop.h"
#include "etcd.h"
#include "md_map.h"
#include "md_parent.h"
#include "lich_md.h"
#include "md_root.h"
#include "ylog.h"
#include "dbg.h"

typedef struct {
        struct list_head hook;
        fileid_t rootid;
        char pool[MAX_NAME_LEN];
        int inited;
} rootid_entry_t;

typedef struct {
        sy_rwlock_t lock;
        struct list_head list;
} rootid_t;

static rootid_t *__rootid__;

static int __md_newinfo(const char *pool, chkinfo_t *chkinfo)
{
        int ret, retry = 0;
        diskid_t diskid[LICH_REPLICA_MAX];
        int repnum, repmin;

        repnum = gloconf.metadata_replica;
        repmin = cluster_is_solomode()? 1 : LICH_REPLICA_MIN;

retry2:
        ret = dispatch_newdisk(diskid, &repnum, repmin, pool, NULL, 0, 0);
        if (unlikely(ret)) {
                if (ret == EAGAIN || ret == ENONET || ret == ENOSYS) {
                        USLEEP_RETRY(err_ret, ret, retry2, retry, 5, (100 * 1000));
                } else
                        GOTO(err_ret, ret);
        }

        memset(chkinfo, 0x0, sizeof(*chkinfo));
retry1:
        ret = dispatch_newid(&chkinfo->id);
        if (unlikely(ret)) {
                if (ret == EAGAIN || ret == ENONET || ret == ENOSYS) {
                        USLEEP_RETRY(err_ret, ret, retry1, retry, 5, (100 * 1000));
                } else
                        GOTO(err_ret, ret);
        }
        
        diskid2loc(chkinfo->diskid, diskid, repnum);
        chkinfo->repnum = repnum;

        return 0;
err_ret:
        return ret;
}

typedef struct {
        const char *name;
        const chkinfo_t *chkinfo;
        const fileid_t *parentid;
        const fileinfo_t *fileinfo;
        int retval;
        sem_t sem;
} ctx_t;

static void __md_create_root__(void *arg)
{
        int ret;
        ctx_t *ctx = arg;

        ret = table_proto_create(ctx->name, ctx->chkinfo, ctx->parentid, net_getadmin(),
                                 TABLE_PROTO_HEAD, ctx->fileinfo, sizeof(fileinfo_t), NULL, 0);

        ctx->retval = ret;
        sem_post(&ctx->sem);
}

static int __md_create_table(const char *name, const chkinfo_t *chkinfo,
                             const fileinfo_t *fileinfo, const fileid_t *parentid)
{
        int ret;
        ctx_t ctx;

        ctx.name = name;
        ctx.chkinfo = chkinfo;
        ctx.parentid = parentid;
        ctx.fileinfo = fileinfo;

        ret = sem_init(&ctx.sem, 0, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        ret = main_loop_request(__md_create_root__, &ctx, "root_create");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = sem_wait(&ctx.sem);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        ret = ctx.retval;
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }
        
        return 0;
err_ret:
        return ret;
}

static int __md_create_root(fileid_t *rootid, chunk_type2str_t type, const char *name, const int *idx)
{
        int ret;
        chkinfo_t *chkinfo;
        char buf[MAX_BUF_LEN];
        fileid_t parentid;
        setattr_t setattr;
        fileinfo_t fileinfo;

        chkinfo = (void *)buf;
        ret = __md_newinfo(name, chkinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        chkinfo->id.type = type;

        rootid_init(&parentid);
        memset(&setattr, 0x0, sizeof(setattr));
        memset(&fileinfo, 0x0, sizeof(fileinfo));
        md_initattr(&setattr, __S_IFDIR, gloconf.chunk_rep);
        if (gloconf.localize) {
                setattr.localize.set_it = 1;
                setattr.localize.val = gloconf.localize;
        }
        if (gloconf.writeback) {
                setattr.writeback.set_it = 1;
                setattr.writeback.val = gloconf.writeback;
        }
        if (gloconf.priority != -1) {
                setattr.priority.set_it = 1;
                setattr.priority.val = gloconf.priority;
        }
        if (gloconf.multipath) {
                setattr.multpath.set_it = 1;
                setattr.multpath.val = gloconf.multipath;
        }
        ret = md_proto_setattr(&fileinfo, &setattr, &chkinfo->id);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_create_table(name, chkinfo, &fileinfo,  &parentid);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        CHKINFO_DUMP(chkinfo, D_INFO);

        ret = etcd_update(ETCD_STORAGE, name, chkinfo,
                          CHKINFO_SIZE(chkinfo->repnum),
                          idx, 0);
        if (unlikely(ret)) {
                ret = EREMCHG;
                GOTO(err_ret, ret);
        }

        char tmp[MAX_NAME_LEN];
        nid2str(tmp, &chkinfo->diskid[0].id);
        ret = maping_set(ID2NID, id2str(&chkinfo->id), tmp);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }
        
        *rootid = chkinfo->id;

        CHKINFO_DUMP(chkinfo, D_INFO);

        return 0;
err_ret:
        return ret;
}

static int __md_load_root_remote(fileid_t *rootid, const char *name, int *create, int *idx)
{
        int ret, retry = 0;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        chkinfo_t *chkinfo;

        *create = FALSE;
        chkinfo = (void *)buf;
retry:
        ret = etcd_get_bin(ETCD_STORAGE, name,
                           chkinfo, NULL, idx);
        if (unlikely(ret)) {
                if (ret == EAGAIN || ret == ENONET || ret == ENOSYS) {
                        if (!ng.daemon) {
                                USLEEP_RETRY(err_ret, ret, retry, retry, 5, (100 * 1000));
                        } else
                                GOTO(err_ret, ret);
                } else
                        GOTO(err_ret, ret);
        }

        if (chkid_isnull(&chkinfo->id)) {
                *create = TRUE;
                goto out;
        }

        *rootid = chkinfo->id;
        YASSERT(chkid_isvalid(rootid));
        md_map_update(&chkinfo->id, &chkinfo->diskid[0].id);

        CHKINFO_DUMP(chkinfo, D_INFO);

        mem_cache_free(MEM_CACHE_4K, buf);

out:
        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}


static int __md_load_root(fileid_t *rootid, chunk_type2str_t type, const char *name)
{
        int ret, create, idx;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        fileid_t parent;
        
        ret = maping_get(NAME2NID, name, buf, NULL);
        if (ret == 0) {
                str2chkid(rootid, buf);
                goto out;
        }

        ret = __md_load_root_remote(rootid, name, &create, &idx);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (create) {
                ret = __md_create_root(rootid, type, name, &idx);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        }

        ret = maping_set(NAME2NID, name, id2str(rootid));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        rootid_init(&parent);
        md_parent_update(rootid, &parent);
 
out:
        YASSERT(!chkid_isnull(rootid));
        mem_cache_free(MEM_CACHE_4K, buf);
       
        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

static int __md_root_get_pool(const chkid_t *chkid, char *pool)
{
        int ret, i;
        etcd_node_t *list, *node;
        char _chkinfo[CHKINFO_MAX];
        chkinfo_t *chkinfo;

        ret = etcd_list(ETCD_STORAGE, &list);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        chkinfo = (void *)_chkinfo;
        for(i = 0; i < list->num_node; i++) {
                node = list->nodes[i];

                ret = etcd_get_bin(ETCD_STORAGE, node->key, chkinfo,
                                   NULL, NULL);
                if (unlikely(ret))
                        GOTO(err_free, ret);

                if (chkid_cmp(&chkinfo->id, chkid) == 0) {
                        strcpy(pool, node->key);
                        break;
                }
        }

        if (i == list->num_node) {
                ret = ENOKEY;
                GOTO(err_free, ret);
        }
        
        free_etcd_node(list);

        return 0;
err_free:
        free_etcd_node(list);
err_ret:
        return ret;
}

int md_root_lookup(const char *_pool, const fileid_t *parent, const fileid_t *chkid,
                   chkinfo_t *chkinfo, nid_t *parentnid)
{
        int ret, retry = 0;
        const char *pool;
        char tmp[MAX_NAME_LEN];

        DBUG("lookup "CHKID_FORMAT", pool %s\n", CHKID_ARG(chkid), _pool);
        
        if (strlen(_pool) == 0) {
                ret = __md_root_get_pool(chkid, tmp);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                pool = tmp;
        } else {
                pool = _pool;
        }

retry:
        ret = etcd_get_bin(ETCD_STORAGE, pool,
                           chkinfo, NULL, NULL);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        CHKINFO_DUMP(chkinfo, D_INFO);

        ret = md_chunk_online(pool, parent, chkinfo);
        if (unlikely(ret)) {
                if ((ret == EREMCHG) && retry == 0) {
                        //md_map_drop(parent, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        md_map_update(&chkinfo->id, &chkinfo->diskid[0].id);

        CHKINFO_DUMP(chkinfo, D_INFO);
        if (unlikely(chkid_cmp(chkid, &chkinfo->id))) {
                DERROR("invalid root "CHKID_FORMAT" --> "CHKID_FORMAT", pool %s %s\n",
                       CHKID_ARG(chkid), CHKID_ARG(&chkinfo->id), pool, _pool);
                ret = ENOENT;
                GOTO(err_ret, ret);
        }

        if (parentnid)
                *parentnid = *(net_getadmin());

        return 0;
err_ret:
        return ret;
}

int md_root_lookup1(const char *_pool, const fileid_t *parent, const fileid_t *chkid,
                   chkinfo_t *chkinfo, nid_t *parentnid)
{
        int ret;
        const char *pool;
        char tmp[MAX_NAME_LEN];

        DBUG("lookup "CHKID_FORMAT", pool %s\n", CHKID_ARG(chkid), _pool);
                
        if (strlen(_pool) == 0) {
                ret = __md_root_get_pool(chkid, tmp);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                pool = tmp;
        } else {
                pool = _pool;
        }

        (void) parent;
        (void) chkid;
        
        ret = etcd_get_bin(ETCD_STORAGE, pool,
                           chkinfo, NULL, NULL);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (unlikely(chkid_cmp(chkid, &chkinfo->id))) {
                DWARN("invalid root "CHKID_FORMAT" --> "CHKID_FORMAT", pool %s %s\n",
                       CHKID_ARG(chkid), CHKID_ARG(&chkinfo->id), pool, _pool);
                ret = ENOENT;
                GOTO(err_ret, ret);
        }
        
        //CHKINFO_DUMP(chkinfo, D_INFO);
        if (parentnid)
                *parentnid = *(net_getadmin());

        return 0;
err_ret:
        return ret;
}

int md_root_chunk_update(const char *pool, const chkinfo_t *_chkinfo,
                         const nid_t *owner, uint64_t info_version)
{
        int ret, idx;
        chkinfo_t *chkinfo;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        nid_t nid;

        YASSERT(ng.daemon);
        ANALYSIS_BEGIN(0);

        chkinfo = (void *)buf;
        ret = etcd_get_bin(ETCD_STORAGE, pool, chkinfo, NULL, &idx);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (nid_cmp(&chkinfo->diskid[0].id, owner)) {
                ret = lease_get(&chkinfo->id, &nid, NULL);
                if (ret) {
                        DWARN("chunk "CHKID_FORMAT" info_version %llu : %llu\n",
                              CHKID_ARG(&chkinfo->id), (LLU)_chkinfo->info_version, (LLU)info_version);
                        ret = EPERM;
                        GOTO(err_ret, ret);
                } else {
                        if (nid_cmp(owner, &nid) == 0) {
                                DINFO("chunk "CHKID_FORMAT" info_version %llu : %llu\n",
                                      CHKID_ARG(&chkinfo->id), (LLU)_chkinfo->info_version, (LLU)info_version);
                        } else {
                                DWARN("chunk "CHKID_FORMAT" info_version %llu : %llu lease %s\n",
                                      CHKID_ARG(&chkinfo->id), (LLU)_chkinfo->info_version, (LLU)info_version, network_rname(&nid));
                                ret = EPERM;
                                GOTO(err_ret, ret);
                        }
                } 
        }

        if (chkinfo->info_version != info_version) {
                DWARN("chunk "CHKID_FORMAT" info_version %llu : %llu\n",
                      CHKID_ARG(&chkinfo->id), (LLU)chkinfo->info_version, (LLU)info_version);
                ret = EPERM;
                GOTO(err_ret, ret);
        }
        
        YASSERT(chkid_cmp(&chkinfo->id, &_chkinfo->id) == 0);
        YASSERT(_chkinfo->info_version > info_version);
        
        DBUG("info_version %llu --> %llu\n", (LLU)_chkinfo->info_version,
              (LLU)chkinfo->info_version);

        ret = etcd_update(ETCD_STORAGE, pool, _chkinfo,
                          CHKINFO_SIZE(_chkinfo->repnum),
                          &idx, 0);
        if (unlikely(ret)) {
                ret = EREMCHG;
                GOTO(err_ret, ret);
        }

        DBUG("update %s index %u\n", pool, idx);
        CHKINFO_DUMP(chkinfo, D_INFO);

        ANALYSIS_END(0, 1000 * 100, NULL);
        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int md_root_chunk_reject(const char *pool, chkinfo_t *chkinfo, const nid_t *nid, nid_t *parentnid)
{
        int ret, idx;

        ANALYSIS_BEGIN(0);

        ret = etcd_get_bin(ETCD_STORAGE, pool, chkinfo, NULL, &idx);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = md_proto_reject(chkinfo, nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = etcd_update(ETCD_STORAGE, pool, chkinfo,
                          CHKINFO_SIZE(chkinfo->repnum),
                          &idx, 0);
        if (unlikely(ret)) {
                ret = EREMCHG;
                GOTO(err_ret, ret);
        }

        DBUG("update %s index %u\n", pool, idx);
        CHKINFO_DUMP(chkinfo, D_INFO);
        
        ANALYSIS_END(0, 1000 * 100, NULL);

        if (parentnid)
                *parentnid = *net_getadmin();

        return 0;
err_ret:
        return ret;
}

int md_root_chunk_cleanup(const char *pool, const nid_t *nid, uint64_t meta_version)
{
        int ret;
        chkinfo_t *chkinfo;
        char buf[MAX_BUF_LEN];

        chkinfo = (void *)buf;

        ANALYSIS_BEGIN(0);

        ret = etcd_get_bin(ETCD_STORAGE, pool, chkinfo, NULL, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = md_proto_chunk_cleanup(chkinfo, nid, meta_version);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_END(0, 1000 * 100, NULL);

        return 0;
err_ret:
        return ret;
}

static int __md_root_find(rootid_t *root, const char *pool, rootid_entry_t **_ent)
{
        int found = 0;
        struct list_head *pos;
        rootid_entry_t *ent;

        list_for_each(pos, &root->list) {
                ent = (rootid_entry_t *)pos;

                if (!strcmp(ent->pool, pool)) {
                        found = 1;
                        break;
                }
        }

        if (found)
                *_ent = ent;
        else
                *_ent = NULL;

        return 0;
}

static int __md_root_create(rootid_t *root, const char *pool, rootid_entry_t **_ent)
{
        int ret;
        rootid_entry_t *ent;

        ret = ymalloc((void **)&ent, sizeof(*ent));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        memset(ent, 0x0, sizeof(*ent));

        strcpy(ent->pool, pool);
        ent->inited = 0;

        list_add_tail(&ent->hook, &root->list);

        if (_ent)
                *_ent = ent;

        return 0;
err_ret:
        return ret;
}

static int __md_root_del(rootid_t *root, const char *pool)
{
        int ret;
        struct list_head *pos, *n;
        rootid_entry_t *ent;

        ret = sy_rwlock_wrlock(&root->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        list_for_each_safe(pos, n, &root->list) {
                ent = (rootid_entry_t *)pos;

                if (!strcmp(ent->pool, pool)) {
                        list_del(&ent->hook);
                        yfree((void **)&ent);
                        break;
                }
        }

        sy_rwlock_unlock(&root->lock);

        return 0;
err_ret:
        return ret;
}

/**
 * @brief 删除存储池
 *
 * @param pool
 * @return
 */
int md_root_del(const char *pool)
{
        int ret;

        DINFO("drop pool %s\n", pool);

        ret = maping_drop(NAME2NID, pool);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        // 清除pool的"/"对应的内存结构
        ret = __md_root_del(__rootid__, pool);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        // 清除pool下面的所有盘
        ret = diskmd_pool_cleanup(pool);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static int __md_root_get(rootid_t *root, const char *pool, rootid_entry_t **_ent)
{
        int ret;
        rootid_entry_t *ent;

        ret = __md_root_find(root, pool, &ent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (!ent) {
                ret = __md_root_create(root, pool, &ent);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        if (_ent)
                *_ent = ent;

        return 0;
err_ret:
        return ret;
}

int md_root(fileid_t *rootid, const char *pool)
{
        int ret;
        rootid_entry_t *ent;
        rootid_t *root = __rootid__;

        ret = sy_rwlock_wrlock(&root->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_root_get(root, pool, &ent);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        if (ent->inited == 0) {
                ret = __md_load_root(&ent->rootid, __POOL_CHUNK__, ent->pool);
                if (unlikely(ret))
                        GOTO(err_lock, ret);

                ent->inited = 1;
        }

        *rootid = ent->rootid;
        YASSERT(!chkid_isnull(rootid));

        sy_rwlock_unlock(&root->lock);

        return 0;
err_lock:
        sy_rwlock_unlock(&root->lock);
err_ret:
        return ret;
}

int md_root_init()
{
        int ret;
        rootid_t *root;

        ret = ymalloc((void **)&root, sizeof(*root));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = sy_rwlock_init(&root->lock, "md_root.lock");
        if (unlikely(ret))
                GOTO(err_free, ret);

        INIT_LIST_HEAD(&root->list);

        __rootid__ = root;

        return 0;
err_free:
        yfree((void **)&root);
err_ret:
        return ret;
}
